Skip to content

Conversation

@Bilna
Copy link

@Bilna Bilna commented Dec 30, 2014

Please review the unit test for MQTT

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@prabeesh
Copy link
Contributor

@tdas verify this patch

@tdas
Copy link
Contributor

tdas commented Dec 31, 2014

Jenksin, this is ok to test.

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24922 has started for PR 3844 at commit e8b6623.

  • This patch merges cleanly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can cause the SparkContext to be not shutdown if there is an exception in the unit test, causing a leaked SparkContext. Take a look at how it is done in the KafkaStreamSuite with BeforeAndAfter.

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24922 has finished for PR 3844 at commit e8b6623.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging
    • class MQTTStreamSuite extends MQTTStreamSuiteBase

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24922/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24946 has started for PR 3844 at commit 5f6bfd2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24948 has started for PR 3844 at commit b1ac4ad.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24949 has started for PR 3844 at commit 4b58094.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24946 has finished for PR 3844 at commit 5f6bfd2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24946/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24948 has finished for PR 3844 at commit b1ac4ad.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24948/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24949 has finished for PR 3844 at commit 4b58094.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24949/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24956 has started for PR 3844 at commit fc8eb28.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24956 has finished for PR 3844 at commit fc8eb28.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24956/
Test FAILed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why is there a String.valueOf(String)?

@SparkQA
Copy link

SparkQA commented Jan 3, 2015

Test build #25007 has started for PR 3844 at commit fac3904.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 3, 2015

Test build #25007 has finished for PR 3844 at commit fac3904.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25007/
Test PASSed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ummm.. isnt this new dependency necessary only for unit test (since nothing in non-test code changed)? In that case it should be added to test scope (See scalatest below). We really try to avoid changing dependencies as any such change can cause conflicts with other stuff (spark's code dependencies) causing unforeseen failures. So if this is only necessary for test, please put it in test scope.

@tdas
Copy link
Contributor

tdas commented Jan 4, 2015

Only one more comment.

@SparkQA
Copy link

SparkQA commented Jan 4, 2015

Test build #25035 has started for PR 3844 at commit acea3a3.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 4, 2015

Test build #25035 has finished for PR 3844 at commit acea3a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25035/
Test PASSed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I missed this in the last pass, but this violates the Scala syntax that we follow. I wont block this PR for this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what is the correction here. Just to understand what went wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (...) {
    msgTopic.publish(message)
}

Such code block should either be in one line or be within braces.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.. thanks.

@tdas
Copy link
Contributor

tdas commented Jan 5, 2015

Merging this, thanks!

@asfgit asfgit closed this in e767d7d Jan 5, 2015
asfgit pushed a commit that referenced this pull request Jan 5, 2015
Please review the unit test for MQTT

Author: bilna <bilnap@am.amrita.edu>
Author: Bilna P <bilna.p@gmail.com>

Closes #3844 from Bilna/master and squashes the following commits:

acea3a3 [bilna] Adding dependency with scope test
28681fa [bilna] Merge remote-tracking branch 'upstream/master'
fac3904 [bilna] Correction in Indentation and coding style
ed9db4c [bilna] Merge remote-tracking branch 'upstream/master'
4b34ee7 [Bilna P] Update MQTTStreamSuite.scala
04503cf [bilna] Added embedded broker service for mqtt test
89d804e [bilna] Merge remote-tracking branch 'upstream/master'
fc8eb28 [bilna] Merge remote-tracking branch 'upstream/master'
4b58094 [Bilna P] Update MQTTStreamSuite.scala
b1ac4ad [bilna] Added BeforeAndAfter
5f6bfd2 [bilna] Added BeforeAndAfter
e8b6623 [Bilna P] Update MQTTStreamSuite.scala
5ca6691 [Bilna P] Update MQTTStreamSuite.scala
8616495 [bilna] [SPARK-4631] unit test for MQTT

(cherry picked from commit e767d7d)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@Bilna
Copy link
Author

Bilna commented Jan 5, 2015

@tdas, Thanks.

@JoshRosen
Copy link
Contributor

It looks like there's maybe a port-binding / racing issue here?

https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=centos/1356/testReport/

sbt.ForkMain$ForkError: Transport Connector could not be registered in JMX: Failed to bind to server socket: mqtt://localhost:23456 due to: java.net.BindException: Address already in use
    at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:27)
    at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:1977)
    at org.apache.activemq.broker.BrokerService.startTransportConnector(BrokerService.java:2468)
    at org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2385)
    at org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:684)
    at org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:642)
    at org.apache.activemq.broker.BrokerService.start(BrokerService.java:578)
    at org.apache.spark.streaming.mqtt.MQTTStreamSuite.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$setupMQTT(MQTTStreamSuite.scala:90)
    at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$1.apply$mcV$sp(MQTTStreamSuite.scala:53)
    at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$1.apply(MQTTStreamSuite.scala:51)
    at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$1.apply(MQTTStreamSuite.scala:51)
    at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:195)
    at org.apache.spark.streaming.mqtt.MQTTStreamSuite.runTest(MQTTStreamSuite.scala:37)
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
    at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
    at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
    at org.scalatest.Suite$class.run(Suite.scala:1424)
    at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
    at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
    at org.apache.spark.streaming.mqtt.MQTTStreamSuite.org$scalatest$BeforeAndAfter$$super$run(MQTTStreamSuite.scala:37)
    at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241)
    at org.apache.spark.streaming.mqtt.MQTTStreamSuite.run(MQTTStreamSuite.scala:37)
    at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
    at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
    at sbt.ForkMain$Run$2.call(ForkMain.java:294)
    at sbt.ForkMain$Run$2.call(ForkMain.java:284)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: sbt.ForkMain$ForkError: Failed to bind to server socket: mqtt://localhost:23456 due to: java.net.BindException: Address already in use
    at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33)
    at org.apache.activemq.transport.tcp.TcpTransportServer.bind(TcpTransportServer.java:138)
    at org.apache.activemq.transport.tcp.TcpTransportFactory.doBind(TcpTransportFactory.java:60)
    at org.apache.activemq.transport.TransportFactory.bind(TransportFactory.java:124)
    at org.apache.activemq.broker.TransportConnector.createTransportServer(TransportConnector.java:310)
    at org.apache.activemq.broker.TransportConnector.getServer(TransportConnector.java:136)
    at org.apache.activemq.broker.TransportConnector.asManagedConnector(TransportConnector.java:105)
    at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:1972)
    ... 38 more
Caused by: sbt.ForkMain$ForkError: Address already in use
    at java.net.PlainSocketImpl.socketBind(Native Method)
    at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376)
    at java.net.ServerSocket.bind(ServerSocket.java:376)
    at java.net.ServerSocket.<init>(ServerSocket.java:237)
    at javax.net.DefaultServerSocketFactory.createServerSocket(ServerSocketFactory.java:231)
    at org.apache.activemq.transport.tcp.TcpTransportServer.bind(TcpTransportServer.java:134)
    ... 44 more

@Bilna
Copy link
Author

Bilna commented Jan 13, 2015

ok.. I will look into it

@dragos
Copy link
Contributor

dragos commented Jan 28, 2015

There also seems to be a race condition introduced by this test. It fails consistently for me (but passes if I add a Thread.sleep(50) inside publishData). I'll open a ticket.

[info] - mqtt input stream *** FAILED *** (552 milliseconds)
[info]   org.eclipse.paho.client.mqttv3.MqttException: Too many publishes in progress
[info]   at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:432)
[info]   at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:121)
[info]   at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:139)
[info]   at org.eclipse.paho.client.mqttv3.MqttTopic.publish(MqttTopic.java:107)
[info]   at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$publishData$1.apply(MQTTStreamSuite.scala:126)
[info]   at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$publishData$1.apply(MQTTStreamSuite.scala:124)
[info]   at scala.collection.immutable.Range.foreach(Range.scala:141)
[info]   at org.apache.spark.streaming.mqtt.MQTTStreamSuite.publishData(MQTTStreamSuite.scala:124)
[info]   at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply$mcV$sp(MQTTStreamSuite.scala:78)
[info]   at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply(MQTTStreamSuite.scala:66)
[info]   at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply(MQTTStreamSuite.scala:66)
[info]   at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
[info]   at org.scalatest.Suite$class.withFixture(Suite.scala:1122)

@srowen
Copy link
Member

srowen commented Jan 28, 2015

@dragos good catch. It sounds like an issue with the test if anything. You could just reopen SPARK-4631 with a workaround.

@dragos
Copy link
Contributor

dragos commented Jan 28, 2015

@srowen I commented on the ticket, but I can't re-open it.

@dragos
Copy link
Contributor

dragos commented Jan 29, 2015

See #4270

asfgit pushed a commit that referenced this pull request Feb 25, 2015
modified to adhere to accepted coding standards as pointed by tdas in PR #3844

Author: prabs <prabsmails@gmail.com>
Author: Prabeesh K <prabsmails@gmail.com>

Closes #4178 from prabeesh/master and squashes the following commits:

bd2cb49 [Prabeesh K] adress the comment
ccc0765 [prabs] adress the comment
46f9619 [prabs] adress the comment
c035bdc [prabs] adress the comment
22dd7f7 [prabs] address the comments
0cc67bd [prabs] adress the comment
838c38e [prabs] adress the comment
cd57029 [prabs] address the comments
66919a3 [Prabeesh K] changed MqttDefaultFilePersistence to MemoryPersistence
5857989 [prabs] modified to adhere to accepted coding standards
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants